Audioデータをクラウドに送ってみました。 MQTT(basic ingest) + Amazon Kinesis Data Streams + Amazon Kinesis Data Firehose + S3 (バイナリ)

Audioデータをクラウドに送ってみました。 MQTT(basic ingest) + Amazon Kinesis Data Streams + Amazon Kinesis Data Firehose + S3 (バイナリ)

Clock Icon2021.04.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

ここまで、エッジ側のAudioデータをクラウドへ送信する要領をいくつか試してみました。

上記のうち、最後だけは、バイナリ型式でデータを送ってみたのですが、実は、AWS IoT SDKのMQTTクライアントのPublishもpayloadは、bufferとなってます。


https://awslabs.github.io/aws-crt-python/api/mqtt.html?#awscrt.mqtt.Connection.publish

今回は、AudioデータをJSON化せず、バイナリ形式のまま送信する要領を確認してみました。

2 構成

構成は、前回とほぼ同じです。

デバイスからは、500msec単位で、Audioデータ(バイナリ形式)とタイムスタンプ(バイナリ形式)を結合したものをPayloadとしてMQTTで送信します。

Amazon Kinesis Data StreamsAmazon Kinesis Data Firehose及び、S3バケットでは、バイナリ形式のまま保存されます。

バイナリデータは、タイムスタンプとAudioのROWデータを結合して、1データが、8008byteです。

下記は、後で出てくる、basic ingestを使用せず、通常のTopicで送信したものをAWSのコンソールでsubscribeしている様子ですが、・・・当然文字化けしてます。

3 basic ingest

今回は、メッセージブローカーを通さずに、basic ingestで送信してみました。
Reducing messaging costs with basic ingest

subscribeする要件がなく、単純にルールエンジンで処理するだけであれば、メッセージブローカーを経由しなければ、その分のコスト削減が可能です。メッセージブローカーのコストは、他に比べて比較的高いので、データ量が多い場合は、是非検討したいところです。

basic ingestを使用するには、送信側でtopic名を($aws/rules/ルール名/元のTopic名)に変えるだけです。(ルールエンジンに側に変更は必要ありません)

作成しているルール名が、audio_transmission_ruleとなているので、変更は、下記のようになってます。

  • 元のTopic名
topic/audio_transmission

  • 変更後のTopic名
$aws/rules/audio_transmission_rule/topic/audio_transmission

ルールエンジンからAmazon Kinesis Data StreamsAmazon Kinesis Data Firehoseを経由して、最終的にS3に保存されている様子です。

4 MQTT(バイナリ)

MQTTで送信しているコードです。

publishのpayloadに、バイナリデータをそのまま指定してます。また、Topic名は、basic ingestを使用するため変更されています。

index.py

import pyaudio
from producer import Producer
import numpy as np

DEVICE_INDEX = 0
CHANNELS = 2 
SAMPLE_RATE = 32000 # サンプルレート
FORMAT = pyaudio.paInt16
CHUNK = int(SAMPLE_RATE/2) # 500msごとに取得する

# open stream
p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
                channels = CHANNELS,
                rate = SAMPLE_RATE,
                input =  True,
                input_device_index = DEVICE_INDEX,
                frames_per_buffer = CHUNK)

producer = Producer()

try:
    print("start ...")
    while True: 
        # 500ms分のデータ読み込み
        data = stream.read(CHUNK)

        # numpy配列に変換    
        data = np.frombuffer(data, dtype="int16")
        # チャンネル 2ch -> 1ch
        data = data[0::2] 
        # サンプルレート  32000Hz -> 8000Hz
        data = data[0::4] 
        # byteに戻す 
        data = data.tobytes()

        producer.send(data)
except:
    stream.stop_stream()
    stream.close()
    p.terminate()

producer.py

from mqtt import Mqtt
import json
from datetime import datetime
import struct

class Producer():
    def __init__(self):
        # basic ingest
        self.__topic = "$aws/rules/audio_transmission_rule/topic/audio_transmission"
        root_ca = "./certs/RootCA.pem"
        cert = "./certs/xxxxxxxxxx-certificate.pem.crt"
        key = "./certs/xxxxxxxxxx-private.pem.key"
        endpoint = "xxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"

        self.__mqtt = Mqtt(root_ca, key, cert, endpoint)

    def send(self, data):
        now = datetime.now()
        # 時刻とデータの結合
        ts = now.timestamp()
        ts = struct.pack('<d', ts)
        transfer_data = ts + data
        try:
            self.__mqtt.publish(self.__topic, transfer_data)
            print("publish {}byte".format(len(transfer_data)))
        except Exception as e:
            print("Exception: {}", e.args)

5 その他

Amazon Kinesis Data StreamsAmazon Kinesis Data Firehoseや、S3に保存されている形式は、前回と変わりませんので、その他の部分のコードに変更はありません。

6 最後に

今回は、MQTTでバイナリデータを送信する要領を確認してみました。

エッジ側で発生したRAWデータを、そのままの形でクラウドに持ってくることは、要件によっては、有効かもしれません。 AWS IoT Analyticsなどでデータストアに一旦蓄積してから、改めて分析等に適した形に操作するというのも有りかも知れません。

全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_5

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.